home *** CD-ROM | disk | FTP | other *** search
/ PCGUIA 127 / PC Guia 127.iso / Software / Produtividade / OpenOffice.org 2.0.1 / openofficeorg4.cab / test_threading.py < prev    next >
Text File  |  2005-11-19  |  1KB  |  56 lines

  1. # Very rudimentary test of threading module
  2.  
  3. # Create a bunch of threads, let each do some work, wait until all are done
  4.  
  5. from test.test_support import verbose
  6. import random
  7. import threading
  8. import time
  9.  
  10. # This takes about n/3 seconds to run (about n/3 clumps of tasks, times
  11. # about 1 second per clump).
  12. numtasks = 10
  13.  
  14. # no more than 3 of the 10 can run at once
  15. sema = threading.BoundedSemaphore(value=3)
  16. mutex = threading.RLock()
  17. running = 0
  18.  
  19. class TestThread(threading.Thread):
  20.     def run(self):
  21.         global running
  22.         delay = random.random() * 2
  23.         if verbose:
  24.             print 'task', self.getName(), 'will run for', delay, 'sec'
  25.         sema.acquire()
  26.         mutex.acquire()
  27.         running = running + 1
  28.         if verbose:
  29.             print running, 'tasks are running'
  30.         mutex.release()
  31.         time.sleep(delay)
  32.         if verbose:
  33.             print 'task', self.getName(), 'done'
  34.         mutex.acquire()
  35.         running = running - 1
  36.         if verbose:
  37.             print self.getName(), 'is finished.', running, 'tasks are running'
  38.         mutex.release()
  39.         sema.release()
  40.  
  41. threads = []
  42. def starttasks():
  43.     for i in range(numtasks):
  44.         t = TestThread(name="<thread %d>"%i)
  45.         threads.append(t)
  46.         t.start()
  47.  
  48. starttasks()
  49.  
  50. if verbose:
  51.     print 'waiting for all tasks to complete'
  52. for t in threads:
  53.     t.join()
  54. if verbose:
  55.     print 'all tasks done'
  56.